#####################################################################################
#
#  Copyright (c) typedef int GmbH
#  SPDX-License-Identifier: EUPL-1.2
#
#####################################################################################

import datetime
import sys

import psutil
from autobahn.util import utcstr
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet.threads import deferToThread
from txaio import make_logger, time_ns

from crossbar.common.checkconfig import check_dict_args

__all__ = ("ProcessMonitor", "SystemMonitor")


class Monitor(object):
    """
    Monitor base class.
    """

    ID = "abstract"
    """
    Sensor ID, must defined in derived class.
    """

    log = make_logger()

    def __init__(self, config=None):
        """

        :param config: Submonitor specific configuration.
        :type config: dict or None
        """
        # submonitor specific configuration
        self._config = config

        # incremented on each poll
        self._tick = 0

        # time of last poll: ns Unix time UTC
        self._last_poll = None

        # effective period corresponding to last poll in ns
        self._last_period = None

        # last polled value
        self._last_value = None

        # total elapsed CPU time in ns reading this sensor
        self._elapsed = 0

    def check(self, config):
        """
        Check submonitor configuration item.

        Override in your derived submonitor class.

        Raise a `crossbar.common.checkconfig.InvalidConfigException` exception
        when you find an error in the item configuration.

        :param config: The submonitor configuration item to check.
        :type config: dict
        """
        check_dict_args({}, config, "{} monitor configuration".format(self.ID))

    def poll(self):
        """
        Measure current stats value and return new stats.

        Override in your derived submonitor class.

        :returns: Current stats from monitor.
        :rtype: dict
        """
        self._tick += 1

        now = time_ns()
        if self._last_poll:
            self._last_period = now - self._last_poll

        current = {
            "tick": self._tick,
            # the UTC timestamp when measurement was taken
            "timestamp": now,
            # the effective last period in ns
            "last_period": self._last_period,
            # duration in seconds the retrieval of sensor values took
            "elapsed": self._elapsed,
        }

        self._last_poll = now
        self._last_value = current

        return current

    def get(self, details=None):
        """
        Get last stats/mesasurement values.

        Usually, there is no need to override this in a derived submonitor, as
        the default implementation already handles storing and returning the
        last submonitor reading.

        :returns: Last stats/values from monitor.
        :rtype: dict or None (when not yet polled)
        """
        self.log.info("{klass}.get(details={})", klass=self.__class__.__name__, details=details)
        return self._last_value


class ProcessMonitor(Monitor):
    ID = "process"

    def __init__(self, worker_type, config):
        Monitor.__init__(self, config)
        self._p = psutil.Process()
        self._worker_type = worker_type

        self._has_io_counters = False
        if not sys.platform.startswith("darwin"):
            if hasattr(self._p, "io_counters"):
                try:
                    self._p.io_counters()
                    self._has_io_counters = True
                except psutil.AccessDenied:
                    pass

    @inlineCallbacks
    def poll(self, verbose=False):
        """
        Measure current stats value and return new stats.

        :returns: A deferred that resolves with a dict containing new process statistics.
        :rtype: :class:`twisted.internet.defer.Deferred`
        """
        self._tick += 1

        now = time_ns()
        if self._last_poll:
            self._last_period = now - self._last_poll

        if verbose:
            _current = {
                "tick": self._tick,
                # the UTC timestamp when measurement was taken
                "timestamp": now,
                # the effective last period in ns
                "last_period": self._last_period,
                # duration in seconds the retrieval of sensor values took
                "elapsed": self._elapsed,
            }
        else:
            _current = {}

        def _poll(current, last_value):
            # normalize with effective period
            diff = 1.0
            if self._last_period:
                diff = self._last_period / 10**9

            # cmd_started = time.time()

            current["type"] = self._worker_type
            current["pid"] = self._p.pid
            current["status"] = self._p.status()

            if verbose:
                current["exe"] = self._p.exe()
                current["user"] = self._p.username()
                current["name"] = self._p.name()
                current["cmdline"] = " ".join(self._p.cmdline())
                created = self._p.create_time()
                current["created"] = utcstr(datetime.datetime.fromtimestamp(created))

            current["num_fds"] = self._p.num_fds()
            current["num_threads"] = self._p.num_threads()
            current["num_fds"] = self._p.num_fds()

            # the following values are cumulative since process creation!
            #
            num_ctx_switches = self._p.num_ctx_switches()
            current["num_ctx_switches_voluntary"] = num_ctx_switches.voluntary
            current["num_ctx_switches_involuntary"] = num_ctx_switches.involuntary

            if self._has_io_counters:
                iocounters = self._p.io_counters()
                current["read_ios"] = iocounters.read_count
                current["write_ios"] = iocounters.write_count
                current["read_bytes"] = iocounters.read_bytes
                current["write_bytes"] = iocounters.write_bytes
            else:
                current["read_ios"] = None
                current["write_ios"] = None
                current["read_bytes"] = None
                current["write_bytes"] = None

            cpu = self._p.cpu_times()
            current["cpu_user"] = cpu.user
            current["cpu_system"] = cpu.system

            # current['command_duration'] = time.time() - cmd_started

            for key in [
                "read_ios",
                "write_ios",
                "read_bytes",
                "write_bytes",
                "cpu_user",
                "cpu_system",
                "num_ctx_switches_voluntary",
                "num_ctx_switches_involuntary",
            ]:
                if last_value and last_value[key] is not None:
                    value = float(current[key] - last_value[key]) / diff
                    current["{}_per_sec".format(key)] = int(value)

            return current

        new_value = yield deferToThread(_poll, _current, self._last_value)

        self._last_poll = now
        self._last_value = new_value

        returnValue(new_value)


class SystemMonitor(Monitor):
    """
    System monitoring via psutils.
    """

    ID = "system"

    @inlineCallbacks
    def poll(self, verbose=False):
        """
        Measure current stats value and return new stats.

        :returns: A deferred that resolves with a dict containing new process statistics.
        :rtype: :class:`twisted.internet.defer.Deferred`
        """
        self._tick += 1

        now = time_ns()
        if self._last_poll:
            self._last_period = now - self._last_poll

        if verbose:
            _current = {
                "tick": self._tick,
                # the UTC timestamp when measurement was taken
                "timestamp": now,
                # the effective last period in ns
                "last_period": self._last_period,
                # duration in seconds the retrieval of sensor values took
                "elapsed": self._elapsed,
            }
        else:
            _current = {}

        # uptime, as all durations, is in ns
        _current["uptime"] = int(now - psutil.boot_time() * 10**9)

        def _poll(current, last_value):
            # normalize with effective period
            diff = 1.0
            if self._last_period:
                diff = self._last_period / 10**9

            # int values: bytes_sent, bytes_recv, packets_sent, packets_recv, errin, errout, dropin, dropout
            current["network"] = dict(psutil.net_io_counters()._asdict())

            # int values: read_count, write_count, read_bytes, write_bytes, read_time, write_time, read_merged_count, write_merged_count, busy_time
            current["disk"] = dict(psutil.disk_io_counters()._asdict())

            if last_value:
                for k in ["network", "disk"]:
                    d = current[k]
                    for k2 in list(d.keys()):
                        value = float(d[k2] - last_value[k][k2]) / diff
                        d["{}_per_sec".format(k2)] = int(value)

            # float values: user, nice, system, idle, iowait, irq, softirq, streal, guest, guest_nice
            current["cpu"] = dict(psutil.cpu_times_percent(interval=None)._asdict())

            cpu_freq = psutil.cpu_freq()
            current["cpu"]["freq"] = round(cpu_freq.current) if cpu_freq else None
            s = psutil.cpu_stats()
            current["cpu"]["ctx_switches"] = s.ctx_switches
            current["cpu"]["interrupts"] = s.interrupts
            current["cpu"]["soft_interrupts"] = s.soft_interrupts

            # int values: total, available, used, free, active, inactive, buffers, cached, shared, slab
            # float values: percent
            current["memory"] = dict(psutil.virtual_memory()._asdict())

            # Network connections
            res = {}
            conns = psutil.net_connections(kind="all")
            for c in conns:
                if c.family not in res:
                    res[c.family] = 0
                res[c.family] += 1
            res2 = {}
            for f, cnt in res.items():
                res2[f.name] = cnt
            current["network"]["connection"] = res2

            return current

        new_value = yield deferToThread(_poll, _current, self._last_value)

        self._elapsed = time_ns() - now
        new_value["elapsed"] = self._elapsed

        self._last_poll = now
        self._last_value = new_value

        returnValue(new_value)
